MutableSharedFlow 随记

发布于 · 最后修改时间 · 标签: Kotlin

MutableSharedFlow 作为一个建立在 Flow 基础上的设计,它的 Shared 特性其实与 Flow 的 collect 有着设计上的冲突。
因为 Shared 特性,它的 emit 与它的订阅者有关系,订阅者的消费速度决定着它的发射速度。然而如果没有消费者,就意味着它的 emit 会直接丢失,而没有被消费到。
举个例子:

val sharedFlow = MutableSharedFlow<Int>();
launch {
    sharedFlow.collect {
        println(it) // 这里通常不会有任何打印
    }
}
sharedFlow.emit(1)

因为 launch 的执行需要时间,在这段时间里,emit 可能已经执行完毕了,从而导致发射的值没有被任何人消费从而丢失。
这对于将 MutableSharedFlow 直接作为 EventEmitter 的替代者来说,会是一个很严重的设计缺陷。


因此,如果要使用 MutableSharedFlow 实现 EventEmitter,需要将 MutableSharedFlow 作为一个间接方案来使用。


最简单的方案就是将 MutableSharedFlow 的 extraBufferCapacity 参数拉得非常高,等于开一个缓冲区来缓冲,这也许能解决 99%的问题。
但这种解决方式非常奇怪,就是明知道有问题,但是用暴力的方式来规避问题发生的概率。
这个方案唯一的好处就是代码量相对比较少。


还有一种方案是使用 Channel:

val channel = Channel<Int>();
val sharedFlow = channel.consumeAsFlow().shareIn(currentCoroutineContext(), SharingStarted.Lazily)
launch {
    sharedFlow.collect {
        println(it) // 这里通常不会有任何打印
    }
}
channel.send(1)

这个方案其实是创建了一个 Channel,两个 Flow。
其中一个 Flow 就是用来接收 Channel 发射出来的所有值并缓存起来(注意这个 flow 不会阻塞 channel)。
第二个 Flow 是通过 shareIn 创建出来的,它是 SharingStarted.Lazily,也就意味着只有在 sharedFlow.collect 执行的时候,上游的 flow 才会把值发射出来(注意,这里的发送是一次性全部发射出来)。

这个方案只能说勉勉强强达到我们的需求,但问题也很多:

  1. 它的 channel.send 并没有阻塞,而是全部被一个 flow 瞬间消费了。
  2. flow 在将数据发射给 sharedFlow 的时候也不被 sharedFlow 的订阅者阻塞,也就是说如果你同时进行两次 launch sharedFlow.collect ,即便第一个 sharedFlow 是在慢慢地消费,但后面那 sharedFlow 因为晚起,所以会丢失所有的数据。

    这个我也不理解它怎么会这样,大家可以自己做实验,这里贴出实验代码:

    val channel = Channel<Int>()
    val MAX = 5;
    launch {
      /// 所有的send,并不会被 collect 阻塞,consumeAsFlow/receiveAsFlow 已经将它全部消费
      delay(1000)
      println("start send")
      for (i in 1..MAX) {
        channel.send(i)
        println("send($i)")
      }
    }
    val flow = channel.receiveAsFlow().shareIn(this, SharingStarted.Lazily)
    launch {
      flow.collect {
        println("collect1($it)")
        delay(1000)
      }
    }
    launch {
      delay(2000)
      flow.collect {
        println("collect2($it)")
      }
    }
  3. 这里用 consumeAsFlow 还是 receiveAsFlow 都一样

但其实我们的希望是,MutableSharedFlow 能够顺序地进行 collect 与 emit。但是 collect 本身是一个阻塞函数,所以尝试从 emit 上入手:
试着在没有订阅者的时候,需要阻塞 emit 函数,直到有订阅者的时候才进入。
其实标准做法是这样的:

val sharedFlow = MutableSharedFlow<Int>().onSubscription {
    sharedFlow.emit(1)
}
launch {
    sharedFlow.collect {
        println(it) // 这里通常不会有任何打印
    }
}

但这其实就不是 EventEmitter 了,比方说我用它来实现一个 onStart,难不成要等到有人来订阅了才去触发 onStart?!并不是的,onStart 它本身是跟着上下文的需求该触发时就要触发。因此官方提供的 onSubscription 这个方案并不成立。